direct_url.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """ PEP 610 """
  2. import json
  3. import re
  4. from pip._vendor import six
  5. from pip._vendor.six.moves.urllib import parse as urllib_parse
  6. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  7. if MYPY_CHECK_RUNNING:
  8. from typing import (
  9. Any, Dict, Iterable, Optional, Type, TypeVar, Union
  10. )
  11. T = TypeVar("T")
  12. DIRECT_URL_METADATA_NAME = "direct_url.json"
  13. ENV_VAR_RE = re.compile(r"^\$\{[A-Za-z0-9-_]+\}(:\$\{[A-Za-z0-9-_]+\})?$")
  14. __all__ = [
  15. "DirectUrl",
  16. "DirectUrlValidationError",
  17. "DirInfo",
  18. "ArchiveInfo",
  19. "VcsInfo",
  20. ]
  21. class DirectUrlValidationError(Exception):
  22. pass
  23. def _get(d, expected_type, key, default=None):
  24. # type: (Dict[str, Any], Type[T], str, Optional[T]) -> Optional[T]
  25. """Get value from dictionary and verify expected type."""
  26. if key not in d:
  27. return default
  28. value = d[key]
  29. if six.PY2 and expected_type is str:
  30. expected_type = six.string_types # type: ignore
  31. if not isinstance(value, expected_type):
  32. raise DirectUrlValidationError(
  33. "{!r} has unexpected type for {} (expected {})".format(
  34. value, key, expected_type
  35. )
  36. )
  37. return value
  38. def _get_required(d, expected_type, key, default=None):
  39. # type: (Dict[str, Any], Type[T], str, Optional[T]) -> T
  40. value = _get(d, expected_type, key, default)
  41. if value is None:
  42. raise DirectUrlValidationError("{} must have a value".format(key))
  43. return value
  44. def _exactly_one_of(infos):
  45. # type: (Iterable[Optional[InfoType]]) -> InfoType
  46. infos = [info for info in infos if info is not None]
  47. if not infos:
  48. raise DirectUrlValidationError(
  49. "missing one of archive_info, dir_info, vcs_info"
  50. )
  51. if len(infos) > 1:
  52. raise DirectUrlValidationError(
  53. "more than one of archive_info, dir_info, vcs_info"
  54. )
  55. assert infos[0] is not None
  56. return infos[0]
  57. def _filter_none(**kwargs):
  58. # type: (Any) -> Dict[str, Any]
  59. """Make dict excluding None values."""
  60. return {k: v for k, v in kwargs.items() if v is not None}
  61. class VcsInfo(object):
  62. name = "vcs_info"
  63. def __init__(
  64. self,
  65. vcs, # type: str
  66. commit_id, # type: str
  67. requested_revision=None, # type: Optional[str]
  68. resolved_revision=None, # type: Optional[str]
  69. resolved_revision_type=None, # type: Optional[str]
  70. ):
  71. self.vcs = vcs
  72. self.requested_revision = requested_revision
  73. self.commit_id = commit_id
  74. self.resolved_revision = resolved_revision
  75. self.resolved_revision_type = resolved_revision_type
  76. @classmethod
  77. def _from_dict(cls, d):
  78. # type: (Optional[Dict[str, Any]]) -> Optional[VcsInfo]
  79. if d is None:
  80. return None
  81. return cls(
  82. vcs=_get_required(d, str, "vcs"),
  83. commit_id=_get_required(d, str, "commit_id"),
  84. requested_revision=_get(d, str, "requested_revision"),
  85. resolved_revision=_get(d, str, "resolved_revision"),
  86. resolved_revision_type=_get(d, str, "resolved_revision_type"),
  87. )
  88. def _to_dict(self):
  89. # type: () -> Dict[str, Any]
  90. return _filter_none(
  91. vcs=self.vcs,
  92. requested_revision=self.requested_revision,
  93. commit_id=self.commit_id,
  94. resolved_revision=self.resolved_revision,
  95. resolved_revision_type=self.resolved_revision_type,
  96. )
  97. class ArchiveInfo(object):
  98. name = "archive_info"
  99. def __init__(
  100. self,
  101. hash=None, # type: Optional[str]
  102. ):
  103. self.hash = hash
  104. @classmethod
  105. def _from_dict(cls, d):
  106. # type: (Optional[Dict[str, Any]]) -> Optional[ArchiveInfo]
  107. if d is None:
  108. return None
  109. return cls(hash=_get(d, str, "hash"))
  110. def _to_dict(self):
  111. # type: () -> Dict[str, Any]
  112. return _filter_none(hash=self.hash)
  113. class DirInfo(object):
  114. name = "dir_info"
  115. def __init__(
  116. self,
  117. editable=False, # type: bool
  118. ):
  119. self.editable = editable
  120. @classmethod
  121. def _from_dict(cls, d):
  122. # type: (Optional[Dict[str, Any]]) -> Optional[DirInfo]
  123. if d is None:
  124. return None
  125. return cls(
  126. editable=_get_required(d, bool, "editable", default=False)
  127. )
  128. def _to_dict(self):
  129. # type: () -> Dict[str, Any]
  130. return _filter_none(editable=self.editable or None)
  131. if MYPY_CHECK_RUNNING:
  132. InfoType = Union[ArchiveInfo, DirInfo, VcsInfo]
  133. class DirectUrl(object):
  134. def __init__(
  135. self,
  136. url, # type: str
  137. info, # type: InfoType
  138. subdirectory=None, # type: Optional[str]
  139. ):
  140. self.url = url
  141. self.info = info
  142. self.subdirectory = subdirectory
  143. def _remove_auth_from_netloc(self, netloc):
  144. # type: (str) -> str
  145. if "@" not in netloc:
  146. return netloc
  147. user_pass, netloc_no_user_pass = netloc.split("@", 1)
  148. if (
  149. isinstance(self.info, VcsInfo) and
  150. self.info.vcs == "git" and
  151. user_pass == "git"
  152. ):
  153. return netloc
  154. if ENV_VAR_RE.match(user_pass):
  155. return netloc
  156. return netloc_no_user_pass
  157. @property
  158. def redacted_url(self):
  159. # type: () -> str
  160. """url with user:password part removed unless it is formed with
  161. environment variables as specified in PEP 610, or it is ``git``
  162. in the case of a git URL.
  163. """
  164. purl = urllib_parse.urlsplit(self.url)
  165. netloc = self._remove_auth_from_netloc(purl.netloc)
  166. surl = urllib_parse.urlunsplit(
  167. (purl.scheme, netloc, purl.path, purl.query, purl.fragment)
  168. )
  169. return surl
  170. def validate(self):
  171. # type: () -> None
  172. self.from_dict(self.to_dict())
  173. @classmethod
  174. def from_dict(cls, d):
  175. # type: (Dict[str, Any]) -> DirectUrl
  176. return DirectUrl(
  177. url=_get_required(d, str, "url"),
  178. subdirectory=_get(d, str, "subdirectory"),
  179. info=_exactly_one_of(
  180. [
  181. ArchiveInfo._from_dict(_get(d, dict, "archive_info")),
  182. DirInfo._from_dict(_get(d, dict, "dir_info")),
  183. VcsInfo._from_dict(_get(d, dict, "vcs_info")),
  184. ]
  185. ),
  186. )
  187. def to_dict(self):
  188. # type: () -> Dict[str, Any]
  189. res = _filter_none(
  190. url=self.redacted_url,
  191. subdirectory=self.subdirectory,
  192. )
  193. res[self.info.name] = self.info._to_dict()
  194. return res
  195. @classmethod
  196. def from_json(cls, s):
  197. # type: (str) -> DirectUrl
  198. return cls.from_dict(json.loads(s))
  199. def to_json(self):
  200. # type: () -> str
  201. return json.dumps(self.to_dict(), sort_keys=True)